本文へスキップ
バージョン: 5.0

RocketMQ Connect実践編 4

SFTPサーバー(ファイルデータ)-> RocketMQ Connect -> SFTPサーバー(ファイル)

準備

RocketMQの起動

  1. Linux/Unix/Mac
  2. 64ビットJDK 1.8以上;
  3. Maven 3.2.x以上;
  4. RocketMQを起動します。 RocketMQ 4.x または RocketMQ 5.x のいずれかのバージョンを使用できます。
  5. ツールを使用してRocketMQのメッセージ送受信テストを行います。

ここでは、環境変数NAMESRV_ADDRを使用して、ツールクライアントにRocketMQの名前サーバーアドレス(localhost:9876)を知らせます。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注記: RocketMQには、TopicとGroupを自動的に作成する機能があります。メッセージの送受信時に対応するTopicまたはGroupが存在しない場合、RocketMQは自動的にそれらを作成します。そのため、事前にTopicとGroupを作成する必要はありません。

コネクタランタイムのビルド

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

SFTPコネクタプラグインのビルド

cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/

mvn clean package -Dmaven.test.skip=true

コンパイルされたSFTP RocketMQコネクタのjarファイルを、ランタイムロード用のプラグインディレクトリに配置します。

mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

スタンドアロンモードでのコネクタワーカーの実行

connect-standalone.confファイルを修正して、RocketMQ接続アドレスなどの情報を設定します。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

設定情報の例を以下に示します。

workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678

clusterName="DefaultCluster"

# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

スタンドアロンモードでは、RocketMQ Connectは同期チェックポイント情報を、`storePathRootDir`で指定されたローカルファイルディレクトリに永続的に保存します。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

同期チェックポイントをリセットする必要がある場合は、永続化されたチェックポイント情報ファイルを削除する必要があります。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

スタンドアロンモードでコネクタワーカーを起動するには

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

SFTPサーバーの設定

SFTP(SSH File Transfer Protocol)は、コンピュータ間での安全なファイル転送に使用されるファイル転送プロトコルです。SFTPはSSH(Secure Shell)プロトコル上に構築されており、暗号化と認証を利用しています。

macOSの組み込みSFTPサービス(「リモートログイン」アクセスを有効にすることで)を使用します。詳細な手順については、Macへのリモートコンピュータからのアクセスを許可するドキュメントを参照してください。

ソーステストファイルの作成

source.txtという名前のテストファイルを作成し、いくつかのテストデータを書き込みます。

mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/

cd /Users/YourUsername/rocketmqconnect/sftp-test/

touch source.txt

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

SFTPサービスにログインして、正常にアクセスできることを確認します。次のコマンドを入力し、パスワードを入力します。

# sftp -P port YourUsername@hostname
sftp -P 22 YourUsername@127.0.0.1

注記: これはローカルMAC OSが提供するSFTPサービスであるため、アドレスは127.0.0.1、ポートはデフォルトの22です。

sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
sftp> ls source.txt
sftp> bye

コネクタの起動

SFTPソースコネクタの起動

次のコマンドを実行してSFTPソースコネクタを起動します。このコネクタはSFTPサービスに接続してsource.txtファイルから読み取ります。ファイル内の各テキスト行に対して、コネクタは内容を解析して一般的なConnectRecordオブジェクトにパッケージ化し、それをRocketMQトピックに送信してシンクコネクタが消費できるようにします。

curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
"connect.topicname": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

curlリクエストがステータス: 200を返した場合、コネクタが正常に作成されたことを示します。レスポンスの例を以下に示します。

{"status":200,"body":{"connector.class":"...

ファイルソースコネクタが正常に起動したことを確認するには、次のコマンドを実行します。

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

コネクタSftpSourceConnectorの起動とターゲット状態STARTEDに設定が成功しました!!

SFTPシンクコネクタの起動

次のコマンドを実行してSFTPシンクコネクタを起動します。このコネクタはRocketMQトピックを購読してメッセージを消費し、それぞれを1行のテキストに変換して、SFTPプロトコルを使用して宛先ファイルsink.txtに書き込みます。

curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
"connect.topicnames": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

curlリクエストがステータス: 200を返した場合、コネクタが正常に作成されたことを示します。レスポンスの例を以下に示します。

{"status":200,"body":{"connector.class":"...

ログを確認して、SFTPシンクコネクタの正常な起動を確認します。

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

コネクタSftpSinkConnectorの起動とターゲット状態STARTEDに設定が成功しました!!

次のコマンドを実行して、データが宛先ファイルに書き込まれたことを確認します。

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

sink.txtファイルが生成され、その内容がsource.txtファイルの内容と一致する場合、プロセス全体が正常に動作しています。

source.txtファイルにさらにテストデータを追加して、テストを続行します。

cd /Users/YourUsername/rocketmqconnect/sftp-test/

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

# Wait a few seconds to give the connector time to replicate data to the sink file.
sleep 10

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

注記: rocketmq-connect-sftpはRocketMQトピックとの間でメッセージの送受信に通常のメッセージを使用するため、ファイルの内容の順序は異なる場合があります。これは順序付きメッセージとは異なり、通常のメッセージの消費では順序は保証されません。